# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

#' Open a multi-file dataset
#'
#' Arrow Datasets allow you to query against data that has been split across
#' multiple files. This sharding of data may indicate partitioning, which
#' can accelerate queries that only touch some partitions (files). Call
#' `open_dataset()` to point to a directory of data files and return a
#' `Dataset`, then use `dplyr` methods to query it.
#'
#' @section Partitioning:
#'
#' Data is often split into multiple files and nested in subdirectories based on the value of one or more
#' columns in the data. It may be a column that is commonly referenced in
#' queries, or it may be time-based, for some examples. Data that is divided
#' this way is "partitioned," and the values for those partitioning columns are
#' encoded into the file path segments.
#' These path segments are effectively virtual columns in the dataset, and
#' because their values are known prior to reading the files themselves, we can
#' greatly speed up filtered queries by skipping some files entirely.
#'
#' Arrow supports reading partition information from file paths in two forms:
#'
#' * "Hive-style", deriving from the Apache Hive project and common to some
#'   database systems. Partitions are encoded as "key=value" in path segments,
#'   such as `"year=2019/month=1/file.parquet"`. While they may be awkward as
#'   file names, they have the advantage of being self-describing.
#' * "Directory" partitioning, which is Hive without the key names, like
#'   `"2019/01/file.parquet"`. In order to use these, we need know at least
#'   what names to give the virtual columns that come from the path segments.
#'
#' The default behavior in `open_dataset()` is to inspect the file paths
#' contained in the provided directory, and if they look like Hive-style, parse
#' them as Hive. If your dataset has Hive-style partitioning in the file paths,
#' you do not need to provide anything in the `partitioning` argument to
#' `open_dataset()` to use them. If you do provide a character vector of
#' partition column names, they will be ignored if they match what is detected,
#' and if they don't match, you'll get an error. (If you want to rename
#' partition columns, do that using `select()` or `rename()` after opening the
#' dataset.). If you provide a `Schema` and the names match what is detected,
#' it will use the types defined by the Schema. In the example file path above,
#' you could provide a Schema to specify that "month" should be `int8()`
#' instead of the `int32()` it will be parsed as by default.
#'
#' If your file paths do not appear to be Hive-style, or if you pass
#' `hive_style = FALSE`, the `partitioning` argument will be used to create
#' Directory partitioning. A character vector of names is required to create
#' partitions; you may instead provide a `Schema` to map those names to desired
#' column types, as described above. If neither are provided, no partitioning
#' information will be taken from the file paths.
#'
#' @param sources One of:
#'   * a string path or URI to a directory containing data files
#'   * a [FileSystem] that references a directory containing data files
#'     (such as what is returned by [s3_bucket()])
#'   * a string path or URI to a single file
#'   * a character vector of paths or URIs to individual data files
#'   * a list of `Dataset` objects as created by this function
#'   * a list of `DatasetFactory` objects as created by [dataset_factory()].
#'
#' When `sources` is a vector of file URIs, they must all use the same protocol
#' and point to files located in the same file system and having the same
#' format.
#' @param schema [Schema] for the `Dataset`. If `NULL` (the default), the schema
#' will be inferred from the data sources.
#' @param partitioning When `sources` is a directory path/URI, one of:
#'   * a `Schema`, in which case the file paths relative to `sources` will be
#'     parsed, and path segments will be matched with the schema fields.
#'   * a character vector that defines the field names corresponding to those
#'     path segments (that is, you're providing the names that would correspond
#'     to a `Schema` but the types will be autodetected)
#'   * a `Partitioning` or `PartitioningFactory`, such as returned
#'     by [hive_partition()]
#'   * `NULL` for no partitioning
#'
#' The default is to autodetect Hive-style partitions unless
#' `hive_style = FALSE`. See the "Partitioning" section for details.
#' When `sources` is not a directory path/URI, `partitioning` is ignored.
#' @param hive_style Logical: should `partitioning` be interpreted as
#' Hive-style? Default is `NA`, which means to inspect the file paths for
#' Hive-style partitioning and behave accordingly.
#' @param unify_schemas logical: should all data fragments (files, `Dataset`s)
#' be scanned in order to create a unified schema from them? If `FALSE`, only
#' the first fragment will be inspected for its schema. Use this fast path
#' when you know and trust that all fragments have an identical schema.
#' The default is `FALSE` when creating a dataset from a directory path/URI or
#' vector of file paths/URIs (because there may be many files and scanning may
#' be slow) but `TRUE` when `sources` is a list of `Dataset`s (because there
#' should be few `Dataset`s in the list and their `Schema`s are already in
#' memory).
#' @param format A [FileFormat] object, or a string identifier of the format of
#' the files in `x`. This argument is ignored when `sources` is a list of `Dataset` objects.
#' Currently supported values:
#' * "parquet"
#' * "ipc"/"arrow"/"feather", all aliases for each other; for Feather, note that
#'   only version 2 files are supported
#' * "csv"/"text", aliases for the same thing (because comma is the default
#'   delimiter for text files
#' * "tsv", equivalent to passing `format = "text", delimiter = "\t"`
#' * "json", for JSON format datasets Note: only newline-delimited JSON (aka ND-JSON) datasets
#'   are currently supported
#' Default is "parquet", unless a `delimiter` is also specified, in which case
#' it is assumed to be "text".
#' @param ... additional arguments passed to `dataset_factory()` when `sources`
#' is a directory path/URI or vector of file paths/URIs, otherwise ignored.
#' These may include `format` to indicate the file format, or other
#' format-specific options (see [read_csv_arrow()], [read_parquet()] and [read_feather()] on how to specify these).
#' @inheritParams dataset_factory
#' @return A [Dataset] R6 object. Use `dplyr` methods on it to query the data,
#' or call [`$NewScan()`][Scanner] to construct a query directly.
#' @export
#' @seealso \href{https://arrow.apache.org/docs/r/articles/dataset.html}{
#' datasets article}
#' @include arrow-object.R
#' @examplesIf arrow_with_dataset() & arrow_with_parquet()
#' # Set up directory for examples
#' tf <- tempfile()
#' dir.create(tf)
#'
#' write_dataset(mtcars, tf, partitioning = "cyl")
#'
#' # You can specify a directory containing the files for your dataset and
#' # open_dataset will scan all files in your directory.
#' open_dataset(tf)
#'
#' # You can also supply a vector of paths
#' open_dataset(c(file.path(tf, "cyl=4/part-0.parquet"), file.path(tf, "cyl=8/part-0.parquet")))
#'
#' ## You must specify the file format if using a format other than parquet.
#' tf2 <- tempfile()
#' dir.create(tf2)
#' write_dataset(mtcars, tf2, format = "ipc")
#' # This line will results in errors when you try to work with the data
#' \dontrun{
#' open_dataset(tf2)
#' }
#' # This line will work
#' open_dataset(tf2, format = "ipc")
#'
#' ## You can specify file partitioning to include it as a field in your dataset
#' # Create a temporary directory and write example dataset
#' tf3 <- tempfile()
#' dir.create(tf3)
#' write_dataset(airquality, tf3, partitioning = c("Month", "Day"), hive_style = FALSE)
#'
#' # View files - you can see the partitioning means that files have been written
#' # to folders based on Month/Day values
#' tf3_files <- list.files(tf3, recursive = TRUE)
#'
#' # With no partitioning specified, dataset contains all files but doesn't include
#' # directory names as field names
#' open_dataset(tf3)
#'
#' # Now that partitioning has been specified, your dataset contains columns for Month and Day
#' open_dataset(tf3, partitioning = c("Month", "Day"))
#'
#' # If you want to specify the data types for your fields, you can pass in a Schema
#' open_dataset(tf3, partitioning = schema(Month = int8(), Day = int8()))
open_dataset <- function(sources,
                         schema = NULL,
                         partitioning = hive_partition(),
                         hive_style = NA,
                         unify_schemas = NULL,
                         format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text", "json"),
                         factory_options = list(),
                         ...) {
  stop_if_no_datasets()

  if (is_list_of(sources, "Dataset")) {
    if (is.null(schema)) {
      if (is.null(unify_schemas) || isTRUE(unify_schemas)) {
        # Default is to unify schemas here
        schema <- unify_schemas(schemas = map(sources, ~ .$schema))
      } else {
        # Take the first one.
        schema <- sources[[1]]$schema
      }
    }
    # Enforce that all datasets have the same schema
    assert_is(schema, "Schema")
    sources <- lapply(sources, function(x) {
      x$WithSchema(schema)
    })
    return(dataset___UnionDataset__create(sources, schema))
  }

  if (is_false(hive_style) &&
    inherits(partitioning, "PartitioningFactory") &&
    identical(partitioning$type_name, "hive")) {
    # Allow default partitioning arg to be overridden by hive_style = FALSE
    partitioning <- NULL
  }

  factory <- DatasetFactory$create(
    sources,
    partitioning = partitioning,
    format = format,
    schema = schema,
    hive_style = hive_style,
    factory_options = factory_options,
    ...
  )
  tryCatch(
    # Default is _not_ to inspect/unify schemas
    factory$Finish(schema, isTRUE(unify_schemas)),
    # n = 4 because we want the error to show up as being from open_dataset()
    # and not augment_io_error_msg()
    error = function(e, call = caller_env(n = 4)) {
      augment_io_error_msg(e, call, format = format)
    }
  )
}

#' Open a multi-file dataset of CSV or other delimiter-separated format
#'
#' A wrapper around [open_dataset] which explicitly includes parameters mirroring [read_csv_arrow()],
#' [read_delim_arrow()], and [read_tsv_arrow()] to allow for easy switching between functions
#' for opening single files and functions for opening datasets.
#'
#' @inheritParams open_dataset
#' @inheritParams read_delim_arrow
#'
#' @section Options currently supported by [read_delim_arrow()] which are not supported here:
#' * `file` (instead, please specify files in `sources`)
#' * `col_select` (instead, subset columns after dataset creation)
#' * `as_data_frame` (instead, convert to data frame after dataset creation)
#' * `parse_options`
#'
#' @examplesIf arrow_with_dataset()
#' # Set up directory for examples
#' tf <- tempfile()
#' dir.create(tf)
#'
#' df <- data.frame(x = c("1", "2", "NULL"))
#' file_path <- file.path(tf, "file1.txt")
#' write.table(df, file_path, sep = ",", row.names = FALSE)
#'
#' # Use readr-style params identically in both `read_csv_dataset()` and `open_csv_dataset()`
#' read_csv_arrow(file_path, na = c("", "NA", "NULL"), col_names = "y", skip = 1)
#' open_csv_dataset(file_path, na = c("", "NA", "NULL"), col_names = "y", skip = 1)
#'
#' # Use `col_types` to specify a schema, partial schema, or compact representation
#' tf2 <- tempfile()
#' write_csv_dataset(cars, tf2)
#'
#' open_csv_dataset(tf2, col_types = schema(speed = int32(), dist = int32()))
#' open_csv_dataset(tf2, col_types = schema(speed = int32()))
#' open_csv_dataset(tf2, col_types = "ii", col_names = c("speed", "dist"), skip = 1)
#'
#' @seealso [open_dataset()]
#' @export
open_delim_dataset <- function(sources,
                               schema = NULL,
                               partitioning = hive_partition(),
                               hive_style = NA,
                               unify_schemas = NULL,
                               factory_options = list(),
                               delim = ",",
                               quote = "\"",
                               escape_double = TRUE,
                               escape_backslash = FALSE,
                               col_names = TRUE,
                               col_types = NULL,
                               na = c("", "NA"),
                               skip_empty_rows = TRUE,
                               skip = 0L,
                               convert_options = NULL,
                               read_options = NULL,
                               timestamp_parsers = NULL,
                               quoted_na = TRUE,
                               parse_options = NULL) {
  open_dataset(
    sources = sources,
    schema = schema,
    partitioning = partitioning,
    hive_style = hive_style,
    unify_schemas = unify_schemas,
    factory_options = factory_options,
    format = "text",
    delim = delim,
    quote = quote,
    escape_double = escape_double,
    escape_backslash = escape_backslash,
    col_names = col_names,
    col_types = col_types,
    na = na,
    skip_empty_rows = skip_empty_rows,
    skip = skip,
    convert_options = convert_options,
    read_options = read_options,
    timestamp_parsers = timestamp_parsers,
    quoted_na = quoted_na,
    parse_options = parse_options
  )
}

#' @rdname open_delim_dataset
#' @export
open_csv_dataset <- function(sources,
                             schema = NULL,
                             partitioning = hive_partition(),
                             hive_style = NA,
                             unify_schemas = NULL,
                             factory_options = list(),
                             quote = "\"",
                             escape_double = TRUE,
                             escape_backslash = FALSE,
                             col_names = TRUE,
                             col_types = NULL,
                             na = c("", "NA"),
                             skip_empty_rows = TRUE,
                             skip = 0L,
                             convert_options = NULL,
                             read_options = NULL,
                             timestamp_parsers = NULL,
                             quoted_na = TRUE,
                             parse_options = NULL) {
  mc <- match.call()
  mc$delim <- ","
  mc[[1]] <- get("open_delim_dataset", envir = asNamespace("arrow"))
  eval.parent(mc)
}

#' @rdname open_delim_dataset
#' @export
open_tsv_dataset <- function(sources,
                             schema = NULL,
                             partitioning = hive_partition(),
                             hive_style = NA,
                             unify_schemas = NULL,
                             factory_options = list(),
                             quote = "\"",
                             escape_double = TRUE,
                             escape_backslash = FALSE,
                             col_names = TRUE,
                             col_types = NULL,
                             na = c("", "NA"),
                             skip_empty_rows = TRUE,
                             skip = 0L,
                             convert_options = NULL,
                             read_options = NULL,
                             timestamp_parsers = NULL,
                             quoted_na = TRUE,
                             parse_options = NULL) {
  mc <- match.call()
  mc$delim <- "\t"
  mc[[1]] <- get("open_delim_dataset", envir = asNamespace("arrow"))
  eval.parent(mc)
}



#' Multi-file datasets
#'
#' @description
#' Arrow Datasets allow you to query against data that has been split across
#' multiple files. This sharding of data may indicate partitioning, which
#' can accelerate queries that only touch some partitions (files).
#'
#' A `Dataset` contains one or more `Fragments`, such as files, of potentially
#' differing type and partitioning.
#'
#' For `Dataset$create()`, see [open_dataset()], which is an alias for it.
#'
#' `DatasetFactory` is used to provide finer control over the creation of `Dataset`s.
#'
#' @section Factory:
#' `DatasetFactory` is used to create a `Dataset`, inspect the [Schema] of the
#' fragments contained in it, and declare a partitioning.
#' `FileSystemDatasetFactory` is a subclass of `DatasetFactory` for
#' discovering files in the local file system, the only currently supported
#' file system.
#'
#' For the `DatasetFactory$create()` factory method, see [dataset_factory()], an
#' alias for it. A `DatasetFactory` has:
#'
#' - `$Inspect(unify_schemas)`: If `unify_schemas` is `TRUE`, all fragments
#' will be scanned and a unified [Schema] will be created from them; if `FALSE`
#' (default), only the first fragment will be inspected for its schema. Use this
#' fast path when you know and trust that all fragments have an identical schema.
#' - `$Finish(schema, unify_schemas)`: Returns a `Dataset`. If `schema` is provided,
#' it will be used for the `Dataset`; if omitted, a `Schema` will be created from
#' inspecting the fragments (files) in the dataset, following `unify_schemas`
#' as described above.
#'
#' `FileSystemDatasetFactory$create()` is a lower-level factory method and
#' takes the following arguments:
#' * `filesystem`: A [FileSystem]
#' * `selector`: Either a [FileSelector] or `NULL`
#' * `paths`: Either a character vector of file paths or `NULL`
#' * `format`: A [FileFormat]
#' * `partitioning`: Either `Partitioning`, `PartitioningFactory`, or `NULL`
#' @section Methods:
#'
#' A `Dataset` has the following methods:
#' - `$NewScan()`: Returns a [ScannerBuilder] for building a query
#' - `$WithSchema()`: Returns a new Dataset with the specified schema.
#'   This method currently supports only adding, removing, or reordering
#'   fields in the schema: you cannot alter or cast the field types.
#' - `$schema`: Active binding that returns the [Schema] of the Dataset; you
#'   may also replace the dataset's schema by using `ds$schema <- new_schema`.
#'
#' `FileSystemDataset` has the following methods:
#' - `$files`: Active binding, returns the files of the `FileSystemDataset`
#' - `$format`: Active binding, returns the [FileFormat] of the `FileSystemDataset`
#'
#' `UnionDataset` has the following methods:
#' - `$children`: Active binding, returns all child `Dataset`s.
#'
#' @export
#' @seealso [open_dataset()] for a simple interface to creating a `Dataset`
Dataset <- R6Class("Dataset",
  inherit = ArrowObject,
  public = list(
    # @description
    # Start a new scan of the data
    # @return A [ScannerBuilder]
    NewScan = function() dataset___Dataset__NewScan(self),
    ToString = function() format_schema(self),
    WithSchema = function(schema) {
      assert_is(schema, "Schema")
      dataset___Dataset__ReplaceSchema(self, schema)
    }
  ),
  active = list(
    schema = function(schema) {
      if (missing(schema)) {
        dataset___Dataset__schema(self)
      } else {
        out <- self$WithSchema(schema)
        # WithSchema returns a new object but we're modifying in place,
        # so swap in that new C++ object pointer into our R6 object
        self$set_pointer(out$pointer())
        self
      }
    },
    metadata = function() self$schema$metadata,
    num_rows = function() self$NewScan()$Finish()$CountRows(),
    num_cols = function() length(self$schema),
    # @description
    # Return the Dataset's type.
    type = function() dataset___Dataset__type_name(self)
  )
)
Dataset$create <- open_dataset

#' @name FileSystemDataset
#' @rdname Dataset
#' @export
FileSystemDataset <- R6Class("FileSystemDataset",
  inherit = Dataset,
  public = list(
    .class_title = function() {
      nfiles <- length(self$files)
      file_type <- self$format$type
      pretty_file_type <- list(
        parquet = "Parquet",
        ipc = "Feather"
      )[[file_type]]

      paste(
        class(self)[[1]],
        "with",
        nfiles,
        pretty_file_type %||% file_type,
        ifelse(nfiles == 1, "file", "files")
      )
    }
  ),
  active = list(
    # @description
    # Return the files contained in this `FileSystemDataset`
    files = function() dataset___FileSystemDataset__files(self),
    # @description
    # Return the format of files in this `Dataset`
    format = function() {
      dataset___FileSystemDataset__format(self)
    },
    # @description
    # Return the filesystem of files in this `Dataset`
    filesystem = function() {
      dataset___FileSystemDataset__filesystem(self)
    }
  )
)

#' @name UnionDataset
#' @rdname Dataset
#' @export
UnionDataset <- R6Class("UnionDataset",
  inherit = Dataset,
  active = list(
    # @description
    # Return the UnionDataset's child `Dataset`s
    children = function() {
      dataset___UnionDataset__children(self)
    }
  )
)

#' @name InMemoryDataset
#' @rdname Dataset
#' @export
InMemoryDataset <- R6Class("InMemoryDataset", inherit = Dataset)
InMemoryDataset$create <- function(x) {
  stop_if_no_datasets()
  if (!inherits(x, "Table")) {
    x <- Table$create(x)
  }
  dataset___InMemoryDataset__create(x)
}


#' @export
names.Dataset <- function(x) names(x$schema)

#' @export
dim.Dataset <- function(x) c(x$num_rows, x$num_cols)

#' @export
dimnames.Dataset <- function(x) list(NULL, names(x))

#' @export
c.Dataset <- function(...) Dataset$create(list(...))

#' @export
as.data.frame.Dataset <- function(x, row.names = NULL, optional = FALSE, ...) {
  collect.Dataset(x)
}

#' @export
head.Dataset <- function(x, n = 6L, ...) {
  head(Scanner$create(x), n)
}

#' @export
tail.Dataset <- function(x, n = 6L, ...) {
  tail(Scanner$create(x), n)
}

#' @export
`[.Dataset` <- function(x, i, j, ..., drop = FALSE) {
  if (nargs() == 2L) {
    # List-like column extraction (x[i])
    return(x[, i])
  }
  if (!missing(j)) {
    x <- select.Dataset(x, all_of(j))
  }

  if (!missing(i)) {
    x <- take_dataset_rows(x, i)
  }
  x
}

take_dataset_rows <- function(x, i) {
  if (!is.numeric(i) || any(i < 0)) {
    stop("Only slicing with positive indices is supported", call. = FALSE)
  }
  scanner <- Scanner$create(x)
  i <- Array$create(i - 1)
  dataset___Scanner__TakeRows(scanner, i)
}

stop_if_no_datasets <- function() {
  if (!arrow_with_dataset()) {
    stop("This build of the arrow package does not support Datasets", call. = FALSE)
  }
}
